package com.factory.aiclient.manager;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.factory.aiclient.feign.RestClient;
import com.factory.aiclient.service.UserDeviceService;
import com.factory.aiclient.util.SpringCtxUtils;
import com.factory.common.bean.UserDevices;
import com.factory.common.pojo.Result;
import com.factory.common.pojo.SendData;
import com.factory.common.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;

import javax.websocket.Session;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * websocket管理
 */
@Slf4j
public class SocketManager {

    /**
     * 客户端连接session集合 泛型数据< deviceId , DeviceSession >
     */
    private static Map<String, DeviceSession> sessionMap = new ConcurrentHashMap<String, DeviceSession>();

    /**
     * 客户端连接消息集合 < deviceId, Message >
     */
    private static Map<String, Object> messageMap = new ConcurrentHashMap<>();

    /**
     * rest 微服务调用
     */
    private RestClient restClient = SpringCtxUtils.getBean(RestClient.class);

    /**
     * 将新消息缓存到消息列表里面
     *
     * @param messageId
     * @param data
     */
    public static void setMessage(String messageId, Object data) {
        if (StringUtil.notEmpty(messageId)) {
            log.debug("存放客户端消息,{}", data);
            messageMap.put(messageId, data);
        }
    }

    /**
     * 获取消息
     *
     * @param messageId
     * @return
     */
    public static Object getMessage(String messageId) {

        if (!sessionMap.containsKey(messageId))
            return null;

        return messageMap.remove(messageId);
    }

    /**
     * 存放新加入的session 便于以后对session管理
     *
     * @param deviceId 设备id
     * @param session  用户session
     */
    public synchronized static void setSession(String deviceId, Session session, UserDevices userDevices) {
        sessionMap.put(deviceId, new DeviceSession(session, userDevices));
    }

    /**
     * 获取新加入的session 便于以后对session管理
     *
     * @param id 设备id
     */
    public synchronized static Session getSession(String id) {
        return sessionMap.get(id).getSession();
    }

    /**
     * 根据session ID移除session
     *
     * @param session
     */
    public static void closeSession(Session session) {

        sessionMap.forEach((k, v) -> {
            if (null != v.getSession() && v.getSession().getId().equals(session.getId())) {
                DeviceSession remove = sessionMap.remove(k);
                log.info("关闭的设备连接,{}",remove.getUserDevices().getId());
            }
        });

        if (session.isOpen()) {
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭session时发生异常,{}", e);
            }
        }
    }

    private static void sendMessage0(String id, String message) {
        DeviceSession deviceSession = sessionMap.get(id);
        if (null == deviceSession) {
            log.warn("deviceSession is null,{}", id);
            return;
        }

        Session session = deviceSession.getSession();
        if (session != null) {
            if (!session.isOpen()) {
                log.warn("session is closed,{}", id);
                closeSession(session);
                return;
            }
            synchronized (session) {
                try {
                    log.debug("发送websocket数据到客户端,{},{}", message, id);
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.info("发送websocket消息是异常,{}", e);
                }
            }
        }
    }

    /**
     * 给用户发送消息
     *
     * @param id      用户id
     * @param message 消息
     */
    public static void sendMessage(String id, SendData message) {
        sendMessage0(id, JSON.toJSONString(message));
    }

    /**
     * 给用户终端设备同步消息消息
     *
     * @param deviceId 用户终端设备id
     * @param message  消息体
     */
    public static Result sendSyncMessage(String deviceId, SendData message) {
        sendMessage(deviceId, message);
        return readMessage(message.getMessageId());
    }

    /**
     * 通过session id 获取设备的 id
     *
     * @param sessionId
     * @return 设备id
     */
    public static String getDeviceIdBySession(String sessionId) {
        Iterator<DeviceSession> iterator = sessionMap.values().iterator();

        while (iterator.hasNext()) {
            DeviceSession deviceSession = iterator.next();
            if (sessionId.equals(deviceSession.getSession().getId())) {
                return deviceSession.getUserDevices().getId();
            }
        }
        return null;
    }

    /**
     * 读取终端发送的消息
     *
     * @param messageId 终端id
     */
    public static Result readMessage(String messageId) {
        log.info("读取终端发送的消息,{}", messageId);

        for (int i = 1; ; i++) {

            Object data = messageMap.remove(messageId);

            log.debug("读取到的终端发送的消息,{}", data);

            if (StringUtil.notEmpty(data)) {
                return Result.success(data);
            } else if (i >= 30) {
                return Result.fail("request websocket message timeout!");
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                log.error("读取消息是发生异常,{}", e);
            }
        }
    }

    /**
     * 给用户发送消息
     *
     * @param id 用户id
     */
    public static Boolean isOnLine(String id) {
        return sessionMap.containsKey(id);
    }

    /**
     * 获取在线的设备列表
     *
     * @return
     */
    public static Set<String> getOnLines() {
        return sessionMap.keySet();
    }

    /**
     * 获取在线的设备列表
     *
     * @return
     */
    public static Set<String> getOnLines(String userId) {
        Set<String> onLines = new HashSet<>();
        sessionMap.forEach((k, v) -> {
            if (null != userId && userId.equals(v.getUserDevices().getUserId())) {
                onLines.add(k);
            }
        });
        return onLines;
    }
}